package org.codehaus.activemq.store.jdbc;

import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.QueueMessageContainer;
import org.codehaus.activemq.store.MessageStore;
import org.codehaus.activemq.util.JMSExceptionHelper;

public class JDBCMessageStore implements MessageStore {
	private static final Log log = LogFactory.getLog(JDBCMessageStore.class);
	protected final WireFormat wireFormat;
	protected final String destinationName;
	protected final SequenceGenerator sequenceGenerator;
	protected final JDBCAdapter adapter;

	public JDBCMessageStore(JDBCAdapter adapter, WireFormat wireFormat, String destinationName) {
		this.adapter = adapter;
		this.sequenceGenerator = adapter.getSequenceGenerator();
		this.wireFormat = wireFormat;
		this.destinationName = destinationName;
	}

	public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
		String messageID = message.getJMSMessageID();
		byte[] data;
		try {
			data = this.wireFormat.toBytes(message);
		} catch (IOException e) {

			throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e,
					e);
		}

		long seq = this.sequenceGenerator.getNextSequenceId();
		try {
			Connection c = TransactionContext.getConnection();
			this.adapter.doAddMessage(c, seq, messageID, this.destinationName, data);
		} catch (SQLException e) {
			throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e,
					e);
		}

		MessageIdentity answer = message.getJMSMessageIdentity();
		answer.setSequenceNumber(new Long(seq));
		return answer;
	}

	public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
		long id = ((Long) identity.getSequenceNumber()).longValue();
		try {
			Connection c = TransactionContext.getConnection();
			byte[] data = this.adapter.doGetMessage(c, id);
			return (ActiveMQMessage) this.wireFormat.fromBytes(data);
		} catch (IOException e) {
			throw JMSExceptionHelper
					.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
		} catch (SQLException e) {
			throw JMSExceptionHelper
					.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);

		}
	}

	public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
		long seq = ((Long) identity.getSequenceNumber()).longValue();
		try {
			Connection c = TransactionContext.getConnection();
			this.adapter.doRemoveMessage(c, seq);
		} catch (SQLException e) {
			throw JMSExceptionHelper
					.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
		}
	}

	public void recover(QueueMessageContainer container) throws JMSException {
		try {
			Connection c = TransactionContext.getConnection();
			this.adapter.doRecover(c, this.destinationName, new JDBCAdapter.MessageListResultHandler() {
				private final QueueMessageContainer val$container=container;

				public void onMessage(long seq, String messageID) throws JMSException {
					this.val$container.recoverMessageToBeDelivered(new MessageIdentity(messageID, new Long(seq)));
				}
			});
		} catch (SQLException e) {
			throw JMSExceptionHelper.newJMSException("Failed to recover container. Reason: " + e, e);
		}
	}

	public void start() throws JMSException {
	}

	public void stop() throws JMSException {
	}
}